rebalance 何时触发?到底干嘛?流程如何?

reblance 何时触发

  • 组订阅发生变更,比如基于正则表达式订阅,当匹配到新的topic创建时,组的订阅就会发生变更。
  • 组的topic分区数发生变更,通过命令行脚本增加了订阅topic的分区数。
  • 组成员发生变更:新加入组以及离开组。

reblance 到底干嘛

一句话:多个Consumer订阅了一个Topic时,根据分区策略进行消费者订阅分区的重分配

Coordinator 到底在那个Broker

找到Coordinator的算法 与 找到_consumer_offsets目标分区的算法是一致的。

  • 第一步:确定目标分区:Math.abs(groupId.hashCode)%50,假设是12。
  • 第二步:找到_consumer_offsets分区为10的Leader副本所在的Broker,那么该broker即为Group Coordinator。

reblance 流程如何

reblance 流程流程整体如下图所示,值得强调的几点如下:

  • Coordinator的角色由Broker端担任。

  • Group Leader 的角色主要有Consumer担任。

  • 加入组请求(JoinGroup)=>作用在于选择Group Leader。

  • 同步组请求(SyncGroup)=>作用在于确定分区分配方案给Coordinator,把方案响应给所有Consumer。

reblance 机制的好处

  • 分区分配权利下放给客户端consumer,因此系统不用重启,既可以实现分区策略的变更。
  • 用户可以自行实现机架感知分配方案。

reblance generation 过滤无用请求

  • kafka引入 reblance generation ,就是为了防止Consumer group的无效Offset提交。若因为某些原因,consumer延迟提交了Offset,而该consumer被踢出了消费组,那么该Consumer再次提交位移时,携带的就是旧的generation了。

reblance 监听器应用级别实战

  • reblance 监听器解决用户 把位移提交到外部存储的情况,在监听器中实现位移保存和位移的重定向。

  • onPartitionsRevoked : rebalance开启新一轮的重平衡前会调用,一般用于手动提交位移,及审计功能

  • onPartitionsAssigned :rebalance在重平衡结束后会调用,一般用于消费逻辑处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
     Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    统计rebalance总时长
    final AtomicLong totalRebalanceTimeMs =new AtomicLong(0L)

    统计rebalance开始时刻
    final AtomicLong rebalanceStart =new AtomicLong(0L)


    1 重平衡监听
    consumer.subscribe(Arrays.asList("test-topic"), new ConsumerRebalanceListener(){

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for(TopicPartition tp : partitions){

    1 保存到外部存储
    saveToExternalStore(consumer.position(tp))

    2 手动提交位移
    //consumer.commitSync(toCommit);
    }

    rebalanceStart.set(System.currentTimeMillis())
    }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

    totalRebalanceTimeMs.addAndGet(System.currentTimeMillis()-rebalanceStart.get())

    for (TopicPartition tp : partitions) {

    consumer.seek(tp,readFromExternalStore(tp))
    }
    }
    });

    2 消息处理
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
    insertIntoDb(buffer);
    consumer.commitSync();
    buffer.clear();
    }
    }

Consumer组内消息均衡实战

Consumer 单线程封装,实现多个消费者来消费(浪费资源)

实例主题:

  • ConsumerGroup 实现组封装
  • ConsumerRunnable 每个线程维护私有的KafkaConsumer实例

1
2
3
4
5
6
7
8
9
10
11
12
13
public class  Main {

public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup1";
String topic = "test-topic";
int consumerNum = 3;

核心对外封装
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {

private List<ConsumerRunnable> consumers;

public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerRunnable implements Runnable {

private final KafkaConsumer<String, String> consumer;

public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true"); //本例使用自动提交位移
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 本例使用分区副本自动分配策略
}

@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
"th message with offset: " + record.offset());
}
}
}
}

一个Consumer,内部实现多线程消费(consumer压力过大)

实例主题:

  • ConsumerHandler 单一的Consumer实例,poll后里面会跑一个线程池,执行多个Processor线程来处理
  • Processor 业务逻辑处理方法

进一步优化建议;

  • ConsumerHandler 设置手动提交位移,负责最终位移提交consumer.commitSync();。
  • ConsumerHandler设置一个全局的Map<TopicPartion,OffsetAndMetadata> offsets,来管理Processor消费的位移。
  • Processor 负责批处理完消息后,得到消息的最大位移,并更新offsets数组
  • ConsumerHandler 根据 offsets,位移提交后会清空offsets集合。
  • ConsumerHandler设置重平衡监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Main {
public static void main(String[] args) {

String brokerList = "localhost:9092,localhost:9093,localhost:9094";
String groupId = "group2";
String topic = "test-topic";
int workerNum = 5;

ConsumerHandler consumers = new ConsumerHandler(brokerList, groupId, topic);
consumers.execute(workerNum);
try {
Thread.sleep(1000000);
} catch (InterruptedException ignored) {}
consumers.shutdown();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerHandler {

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;

public ConsumerHandler(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {

executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);

for (final ConsumerRecord record : records) {
executors.submit(new Processor(record));
}


}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class Processor implements Runnable {

private ConsumerRecord<String, String> consumerRecord;

public Processor(ConsumerRecord record) {
this.consumerRecord = record;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " consumed " + consumerRecord.partition()
+ "th message with offset: " + consumerRecord.offset());
}
}

方案对比

  • 第一种方案:建议采用 Consumer 单线程封装,实现多个消费者来消费(浪费资源),这样能很好地保证分区内消费的顺序,同时也没有线程切换的开销。
  • 第二种方案:实现复杂,问题在于可能无法维护分区内的消息顺序,注意消息处理和消息接收解耦了。

Consumer指定分区消费案例实战(Standalone Consumer)

  • Standalone Consumer assign 用于接收指定分区列表的消息和Subscribe是矛盾的。只能二选一。

  • 多个 Consumer 实例消费一个 Topic 借助于 group reblance可谓是天作之合。

  • 若要精准控制,assign逃不了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    poperties props = new Properties();
    props.put("bootstrap.servers", brokerList);
    props.put("group.id", groupId);
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<>(props);

    List<TopicPartion> partitions = new ArrayList<>();

    List<PartitionInfo> allPartitions = consumer.partitionsFor("kaiXinTopic")

    if(allPartitions != null && !allPartitions.isEmpty){
    for(PartitionInfo partitionInfo : allPartitions){

    partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()))
    }

    consumer.assign(partitions)
    }

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
    insertIntoDb(buffer);

    consumer.commitSync();

    buffer.clear();
    }